/*
 * Copyright 2024-2025 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.cloud.ai.graph.streaming;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * ParallelGraphFlux is used to manage multiple GraphFlux instances for parallel nodes.
 *
 * Core features:
 * - Manage GraphFlux streams generated by multiple parallel nodes
 * - Maintain node identifier information for each stream
 * - Support stream merging and distribution processing
 * - Provide unified metadata management
 *
 * @author disaster
 * @since 1.0.4
 */
public class ParallelGraphFlux {

    /**
     * List of GraphFlux for parallel nodes
     */
    private final List<GraphFlux<?>> graphFluxes;

    /**
     * Global metadata for storing parallel processing related information
     */
    private final Map<String, Object> metadata;

    /**
     * Private constructor
     */
    private ParallelGraphFlux(List<GraphFlux<?>> graphFluxes, Map<String, Object> metadata) {
        this.graphFluxes = graphFluxes != null ? new ArrayList<>(graphFluxes) : new ArrayList<>();
        this.metadata = metadata != null ? new HashMap<>(metadata) : new HashMap<>();
    }

    /**
     * Static factory method to create ParallelGraphFlux instance
     *
     * @param graphFluxes List of GraphFlux
     * @return ParallelGraphFlux instance
     */
    public static ParallelGraphFlux of(List<GraphFlux<?>> graphFluxes) {
        return new ParallelGraphFlux(graphFluxes, null);
    }

    /**
     * Static factory method to create ParallelGraphFlux instance with metadata
     *
     * @param graphFluxes List of GraphFlux
     * @param metadata Metadata
     * @return ParallelGraphFlux instance
     */
    public static ParallelGraphFlux of(List<GraphFlux<?>> graphFluxes, Map<String, Object> metadata) {
        return new ParallelGraphFlux(graphFluxes, metadata);
    }

    /**
     * Create an empty ParallelGraphFlux instance
     *
     * @return Empty ParallelGraphFlux instance
     */
    public static ParallelGraphFlux empty() {
        return new ParallelGraphFlux(null, null);
    }

    /**
     * Add GraphFlux to the parallel list
     *
     * @param graphFlux GraphFlux to be added
     * @return Current ParallelGraphFlux instance, supporting chain calls
     */
    public ParallelGraphFlux addGraphFlux(GraphFlux<?> graphFlux) {
        if (graphFlux != null) {
            this.graphFluxes.add(graphFlux);
        }
        return this;
    }

    /**
     * Batch add GraphFlux to the parallel list
     *
     * @param graphFluxes List of GraphFlux to be added
     * @return Current ParallelGraphFlux instance, supporting chain calls
     */
    public ParallelGraphFlux addGraphFluxes(List<GraphFlux<?>> graphFluxes) {
        if (graphFluxes != null) {
            this.graphFluxes.addAll(graphFluxes);
        }
        return this;
    }

    /**
     * Get all GraphFlux list
     *
     * @return Read-only GraphFlux list
     */
    public List<GraphFlux<?>> getGraphFluxes() {
        return Collections.unmodifiableList(graphFluxes);
    }

    /**
     * Get metadata
     *
     * @return Metadata Map
     */
    public Map<String, Object> getMetadata() {
        return metadata;
    }

    /**
     * Check if it's empty
     *
     * @return true if there's no GraphFlux, otherwise false
     */
    public boolean isEmpty() {
        return graphFluxes.isEmpty();
    }

    /**
     * Get the number of GraphFlux
     *
     * @return Number of GraphFlux
     */
    public int size() {
        return graphFluxes.size();
    }

    /**
     * Get the corresponding GraphFlux by node ID
     *
     * @param nodeId Node ID
     * @return Corresponding GraphFlux, null if not found
     */
    public GraphFlux<?> getGraphFluxByNodeId(String nodeId) {
        return graphFluxes.stream()
                .filter(gf -> nodeId.equals(gf.getNodeId()))
                .findFirst()
                .orElse(null);
    }

    /**
     * Get all node IDs
     *
     * @return List of node IDs
     */
    public List<String> getNodeIds() {
        return graphFluxes.stream()
                .map(GraphFlux::getNodeId)
                .collect(Collectors.toList());
    }

    /**
     * Check if it contains GraphFlux with specified node ID
     *
     * @param nodeId Node ID
     * @return true if contains, otherwise false
     */
    public boolean containsNodeId(String nodeId) {
        return graphFluxes.stream()
                .anyMatch(gf -> nodeId.equals(gf.getNodeId()));
    }

    /**
     * Check if all GraphFlux have result mapping functions
     *
     * @return true if all GraphFlux have result mapping functions, false for empty collection
     */
    public boolean allHaveMapResult() {
        return !graphFluxes.isEmpty() && graphFluxes.stream()
                .allMatch(GraphFlux::hasMapResult);
    }

    /**
     * Check if any GraphFlux has result mapping function
     *
     * @return true if any GraphFlux has result mapping function, otherwise false
     */
    public boolean anyHaveMapResult() {
        return graphFluxes.stream()
                .anyMatch(GraphFlux::hasMapResult);
    }

    /**
     * Add global metadata
     *
     * @param key Key
     * @param value Value
     * @return Current ParallelGraphFlux instance, supporting chain calls
     */
    public ParallelGraphFlux withMetadata(String key, Object value) {
        this.metadata.put(key, value);
        return this;
    }

    /**
     * Add multiple global metadata
     *
     * @param metadata Metadata Map to be added
     * @return Current ParallelGraphFlux instance, supporting chain calls
     */
    public ParallelGraphFlux withMetadata(Map<String, Object> metadata) {
        if (metadata != null) {
            this.metadata.putAll(metadata);
        }
        return this;
    }


    /**
     * Create a new ParallelGraphFlux containing only GraphFlux that meet the conditions
     *
     * @param predicate Filter condition
     * @return New ParallelGraphFlux instance
     */
    public ParallelGraphFlux filter(java.util.function.Predicate<GraphFlux<?>> predicate) {
        List<GraphFlux<?>> filtered = graphFluxes.stream()
                .filter(predicate)
                .collect(Collectors.toList());
        return new ParallelGraphFlux(filtered, this.metadata);
    }

    @Override
    public String toString() {
        return String.format("ParallelGraphFlux{size=%d, nodeIds=%s, metadata=%s}",
                size(), getNodeIds(), metadata);
    }
}
